
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;

import javax.annotation.Nullable;


public class WindowAll {


    public static void main(String[] args) throws Exception
    {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> dataStream = env
                .socketTextStream("Desktop", 3456).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
                    long currentTimeStamp = 0L;
                    long maxDelayAllowed = 5000L;
                    long currentWaterMark;

                    @Override
                    public Watermark getCurrentWatermark() {
                        currentWaterMark = currentTimeStamp - maxDelayAllowed;
                        System.out.println("当前水位线:" + currentWaterMark);
                        return new Watermark(currentWaterMark);
                    }

                    @Override
                    public long extractTimestamp(String s, long l) {
                        String[] arr = s.split(",");
                        long timeStamp = Long.parseLong(arr[1]);
                        currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
                        System.out.println("Key:" + arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
                        return timeStamp;
                    }

                });
//    在设置WaterMark方法中，先调用extractTimestamp方法，再调用getCurrentWatermark方法

        dataStream.map(new MapFunction<String, Tuple2<String, String>>()

        {
            @Override
            public Tuple2<String, String> map (String s) throws Exception {
                return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);
            }
        }).

                keyBy(0)
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
                        //window的作用就是所有数据让一个taskmanager来处理,并行度只有1
                .fold("Start:",new FoldFunction<Tuple2<String, String>, String>()
                        {
                            @Override
                            public String fold (String s, Tuple2 < String, String > o) throws Exception
                            {
                                return s + " - " +o.f1;
                            }
                        }).

                print();

        env.execute("WaterMark Test Demo");
    }
}



/*
代码来自:
//http://www.louisvv.com/archives/2225.html
*/